package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object Spark_Stream_Queue {

  def main(args: Array[String]): Unit = {
    //初始化Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    //初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
    //创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //创建 QueueInputDStream
    val inputStream =  ssc.queueStream(rddQueue,oneAtATime = false);
    //处理队列中的RDD数据
    val mappedStream =  inputStream.map( (_,1) )
    val reduceStream =  mappedStream.reduceByKey(_+_)
    //打印结果
    reduceStream.print()

    //启动

    ssc.start()
    //循环创建RDD并向队列中添加RDD
    for (i <- 1 to 5){
      rddQueue += ssc.sparkContext.makeRDD(1 to 100,10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()
  }

}
